package com.galeno.练习

import ch.hsr.geohash.GeoHash
import com.galeno.utils.SparkUtil
import org.apache.spark.rdd.{JdbcRDD, RDD}

import java.sql.{DriverManager, PreparedStatement, ResultSet}

/**
 * @Title: ${file_name}
 * @Description: ${todo}
 * @author galeno
 * @date 2021/9/220:30
 */
object GeoHashDemo1 {
  def main(args: Array[String]): Unit = {
    val sc = SparkUtil.getSc
    //加载日志文件
    // val logRdd: RDD[String] = sc.textFile("data/app_log_2021-06-05.log")

    //读取MySQL
    var getConn = () => DriverManager.getConnection("jdbc:mysql://localhost:3306/spark", "root", "root")
    var sql = "select lat,lng,province,city,region from ref_zb where lat>=? and lat<=?"
    var mapRow = (rs: ResultSet) => {
      val lat = rs.getDouble(1)
      val lng = rs.getDouble(2)
      val province = rs.getString(3)
      val city: String = rs.getString(4)
      val region = rs.getString(5)
      val geohash = GeoHash.geoHashStringWithCharacterPrecision(lat, lng, 6)

      (geohash, province, city, region)
    }
    val jdbcRdd: JdbcRDD[(String, String, String, String)] = new JdbcRDD[(String, String, String, String)](sc, getConn, sql, -90, 90, 1, mapRow)

    // jdbcRdd.foreach(println)


    // 写会mysql
    jdbcRdd.foreachPartition(iter => {
      val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/spark", "root", "root")
      val stmt: PreparedStatement = conn.prepareStatement("insert into ref_geohash values (?,?,?,?)")
      iter.foreach(tp => {
        stmt.setString(1, tp._1)
        stmt.setString(2, tp._2)
        stmt.setString(3, tp._3)
        stmt.setString(4, tp._4)

        val i = stmt.executeUpdate()
        println(i)
      })

      stmt.close()
      conn.close()

    })
    sc.stop()


  }

}
